package com.example.dobs.demo.flink.io;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 * date: 2021-01-23
 * @author Wang Liwei
 * 自定义DataStream,定时循环扫描本地文件，并将之发送到下游。
 */
public class MyDataStreamSourceTest extends RichSourceFunction<String> {
    private volatile boolean isRunning = true;
    private volatile Path path = new Path("file:///Users/msxr/develop/var/tmp/employee.txt");

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            try{
                FileSystem fileSystem = path.getFileSystem();
                FSDataInputStream open = fileSystem.open(path);
                BufferedReader br = new BufferedReader(new InputStreamReader(open));
                String line;
                while ((line=br.readLine())!=null){
                    ctx.collect(line);
                }
            }catch (Exception e){
                e.printStackTrace();
            }
            Thread.sleep(10000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> streamSource = env.addSource(new MyDataStreamSourceTest());
        SingleOutputStreamOperator<String> outputStreamOperator = streamSource.map(new RichMapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value.replace(",", " == ");
            }
        });
        outputStreamOperator.print();
        env.execute("wlw_test");
    }
}
